package com.stars.distributed.schedule.core;

import com.stars.distributed.schedule.bean.DbScheduleConfig;
import com.stars.distributed.schedule.bean.DbScheduleServer;
import com.stars.distributed.schedule.exception.DistributedScheduleRuntimeException;
import com.stars.distributed.schedule.service.DistributedScheduleService;
import com.stars.distributed.schedule.enums.ManagerStatus;
import com.stars.distributed.schedule.enums.Switch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 分布式调度框架管理者:
 * 1.初始化定时分布式调度的所需参数及配置
 * 2.初始化线程池：每个进程同一时间只允许有一个任务状态维护线程以及线程控制器
 * 3.管理框架的启动、暂停、停止
 *
 * @author guoguifang
 */
public final class DistributedScheduleManager {

    private static final Logger logger = LoggerFactory.getLogger(DistributedScheduleManager.class);

    /**
     * 分布式调度总配置
     */
    private DbScheduleConfig dbScheduleConfig;

    /**
     * 分布式调度配置参数
     */
    private final DbScheduleServer dbScheduleServerConfig = new DbScheduleServer();

    /**
     * 分布式调度配置参数锁
     */
    private final ReentrantLock serverConfigLock = new ReentrantLock();

    /**
     * 本地分布式调度框架服务器配置参数
     */
    private DbScheduleServer localDbScheduleServerConfig;

    /**
     * 管理器的初始状态为未激活状态
     */
    private ManagerStatus managerStatus = ManagerStatus.DISABLED;

    public synchronized void start() {
        // 初始化服务器配置参数，如果初始化配置失败那么返回
        initServerConfig();
        if (managerStatus.STATUS_CODE < ManagerStatus.SERVER_CONFIG_INIT_SUCCESS.STATUS_CODE) {
            return;
        }

        // 启动心跳，如果启动心跳失败则返回
        startHeartbeat();
        if (managerStatus.STATUS_CODE < ManagerStatus.HEARTBEAT_START_SUCCESS.STATUS_CODE) {
            return;
        }

        // 启动调度器
        startSchedule();
    }

    /**
     * 初始化服务器配置参数,若初始化成功则将状态修改为初始化配置成功，否则修改为初始化配置失败
     */
    public synchronized void initServerConfig() {
        // 判断管理者是否为未激活状态，若是则不启动分布式调度框架
        if (managerStatus == ManagerStatus.DISABLED) {
            logger.warn("The distributed scheduling framework is disabled, can't init server config!");
            return;
        }
        // 判断服务器配置是否已经初始化完成
        if (managerStatus.STATUS_CODE >= ManagerStatus.SERVER_CONFIG_INIT_SUCCESS.STATUS_CODE) {
            if (logger.isDebugEnabled()) {
                logger.debug("The distributed scheduling framework configuration has been initialized successfully!");
            }
            return;
        }
        final ReentrantLock serverConfigLock = this.serverConfigLock;
        serverConfigLock.lock();
        try {
            // 根据IP地址和端口号从数据库中获取配置信息，如果没有查询到数据则插入本地配置到数据库中
            DbScheduleServer dbScheduleServerConfigFromDb = DistributedScheduleService.getInstance().getDbScheduleServerConfig(localDbScheduleServerConfig);
            if (dbScheduleServerConfigFromDb == null) {
                int result = DistributedScheduleService.getInstance().insertDbScheduleServer(localDbScheduleServerConfig);
                if (result > 0) {
                    dbScheduleServerConfig.copy(localDbScheduleServerConfig);
                } else {
                    managerStatus = ManagerStatus.SERVER_CONFIG_INIT_FAIL;
                    logger.error("The distributed scheduling framework config initialize failure!");
                    return;
                }
            } else {
                dbScheduleServerConfig.copy(dbScheduleServerConfigFromDb);
            }
        } catch (Exception e) {
            managerStatus = ManagerStatus.SERVER_CONFIG_INIT_FAIL;
            logger.error("The distributed scheduling framework config initialize failure!", e);
            return;
        } finally {
            serverConfigLock.unlock();
        }
        managerStatus = ManagerStatus.SERVER_CONFIG_INIT_SUCCESS;
        logger.info("The distributed scheduling framework configuration initialize success!");
    }

    /**
     * 启动心跳,若启动成功则将状态修改为启动心跳成功，否则修改为启动心跳失败
     */
    public synchronized void startHeartbeat() {
        // 判断管理者是否为未激活状态，若是则不启动分布式调度框架
        if (managerStatus == ManagerStatus.DISABLED) {
            logger.warn("The distributed scheduling framework is disabled, can't start heartbeat!");
            return;
        }
        // 判断配置是否初始化完成
        if (managerStatus.STATUS_CODE < ManagerStatus.SERVER_CONFIG_INIT_SUCCESS.STATUS_CODE) {
            logger.warn("The distributed scheduling framework configuration is not initialized successfully, can't start heartbeat!");
            return;
        }
        // 判断心跳是否已经启动成功
        if (managerStatus.STATUS_CODE >= ManagerStatus.HEARTBEAT_START_SUCCESS.STATUS_CODE) {
            if (logger.isDebugEnabled()) {
                logger.debug("The distributed scheduling framework heartbeat has been started successfully!");
            }
            return;
        }
        // 创建心跳线程、拉取服务器配置线程并启动
        try {
            DistributedScheduleHeartbeat.getSingleInstance().start();
            DistributedSchedulePuller.getSingleInstance().start();
        } catch (Exception e) {
            managerStatus = ManagerStatus.HEARTBEAT_START_FAIL;
            logger.error("The distributed scheduling framework start heartbeat failure!", e);
        }
        managerStatus = ManagerStatus.HEARTBEAT_START_SUCCESS;
        logger.info("The distributed scheduling framework start heartbeat success!");
    }

    /**
     * 启动分布式调度框架调度器
     */
    public synchronized void startSchedule() {
        // 判断管理者是否为未激活状态，若是则不启动分布式调度框架
        if (managerStatus == ManagerStatus.DISABLED) {
            logger.warn("The distributed scheduling framework is not enabled, can't start schedule!");
            return;
        }
        // 判断调度器开关是否打开
        final ReentrantLock serverConfigLock = this.serverConfigLock;
        serverConfigLock.lock();
        try {
            if (Switch.CLOSE == Switch.forCode(dbScheduleServerConfig.getScheduleSwitch())) {
                logger.warn("The distributed scheduling framework schedule switch is not open, can't start schedule!");
                return;
            }
        } finally {
            serverConfigLock.unlock();
        }

        // 判断心跳线程是否启动成功
        if (managerStatus.STATUS_CODE < ManagerStatus.HEARTBEAT_START_SUCCESS.STATUS_CODE) {
            logger.warn("The distributed scheduling framework heartbeat does not start successfully, can't start schedule!");
            return;
        }
        // 判断调度器是否已经启动成功
        if (managerStatus.STATUS_CODE >= ManagerStatus.SCHEDULE_START_SUCCESS.STATUS_CODE) {
            if (logger.isDebugEnabled()) {
                logger.debug("The distributed scheduling framework schedule has been started successfully!");
            }
            return;
        }
        try {
            // 启动分布式调度框架调度器、任务抓取者、任务线程池
            DistributedScheduler.getSingleInstance().start();
            DistributedTaskExecutorManager.getSingleInstance().start();
            DistributedScheduleCatcher.getSingleInstance().start();
        } catch (Exception e) {
            managerStatus = ManagerStatus.SCHEDULE_START_FAIL;
            logger.error("The distributed scheduling framework schedule start failure!", e);
        }
        managerStatus = ManagerStatus.SCHEDULE_START_SUCCESS;
        logger.info("The distributed scheduling framework schedule start success!");
    }

    /**
     * 停止分布式调度框架调度器
     */
    public synchronized void stopSchedule() {
        // 判断调度器是否启动成功
        if (managerStatus.STATUS_CODE < ManagerStatus.SCHEDULE_START_SUCCESS.STATUS_CODE) {
            logger.warn("The distributed scheduling framework schedule does not start successfully, can't stop schedule!");
            return;
        }
        try {
            // 停止分布式调度框架调度器、任务抓取者、任务线程池
            DistributedScheduleCatcher.getSingleInstance().stop();
//            DistributedTaskExecutorManager.getSingleInstance().stop();
            DistributedScheduler.getSingleInstance().stop();
        } catch (Exception e) {
            logger.error("The distributed scheduling framework schedule stop failure!", e);
        }
        managerStatus = ManagerStatus.SCHEDULE_STANDBY;
        logger.info("The distributed scheduling framework schedule stop success!");
    }

    /**
     * 暂停分布式调度框架调度器
     */
    public synchronized void suspendSchedule() {
        // 判断调度器是否启动成功
        if (managerStatus.STATUS_CODE < ManagerStatus.SCHEDULE_START_SUCCESS.STATUS_CODE) {
            logger.warn("The distributed scheduling framework schedule does not start successfully, can't suspend schedule!");
            return;
        }
        // 判断调度器是否已经暂停成功
        if (managerStatus == ManagerStatus.SCHEDULE_PAUSE) {
            if (logger.isDebugEnabled()) {
                logger.debug("The distributed scheduling framework schedule has been suspended successfully!");
            }
            return;
        }
        // 暂停分布式调度框架调度器、任务抓取者、任务线程池
        DistributedScheduler.getSingleInstance().suspend();
//        DistributedTaskExecutorManager.getSingleInstance().suspend();
        DistributedScheduleCatcher.getSingleInstance().suspend();
        managerStatus = ManagerStatus.SCHEDULE_PAUSE;
        logger.info("The distributed scheduling framework schedule suspend success!");
    }

    /**
     * 恢复分布式调度框架调度器
     */
    public synchronized void resumeSchedule() {
        // 判断调度器是否启动成功
        if (managerStatus.STATUS_CODE < ManagerStatus.SCHEDULE_START_SUCCESS.STATUS_CODE) {
            logger.warn("The distributed scheduling framework schedule does not start successfully, can't resume schedule!");
            return;
        }
        // 判断调度器是否暂停成功
        if (managerStatus != ManagerStatus.SCHEDULE_PAUSE) {
            if (logger.isDebugEnabled()) {
                logger.debug("The distributed scheduling framework schedule is not suspended!");
            }
            return;
        }
        // 恢复暂停分布式调度框架调度器、任务抓取者、任务线程池
        DistributedScheduleCatcher.getSingleInstance().resume();
//        DistributedTaskExecutorManager.getSingleInstance().resume();
        DistributedScheduler.getSingleInstance().resume();
        managerStatus = ManagerStatus.SCHEDULE_START_SUCCESS;
        logger.info("The distributed scheduling framework schedule resume success!");
    }

    /**
     * 唤醒任务调度者
     */
    public void signalIdleScheduler() {
        DistributedScheduler.getSingleInstance().signalIdle();
    }

    /**
     * 处理主任务、子任务数量不匹配的任务
     */
    public void needInitMismatchTask() {
        DistributedScheduler.getSingleInstance().needInitMismatchTask();
    }

    public void enable() {
        if (this.managerStatus == ManagerStatus.DISABLED) {
            this.managerStatus = ManagerStatus.SERVER_CONFIG_PENDING_INIT;
        }
    }

    /**
     * 设置分布式线程池核心线程数量
     */
    public void setCorePoolSize(int corePoolSize) {
        final ReentrantLock serverConfigLock = this.serverConfigLock;
        serverConfigLock.lock();
        try {
            if (corePoolSize < 0 || corePoolSize > this.dbScheduleServerConfig.getMaxPoolSize()) {
                throw new DistributedScheduleRuntimeException("The core pool size of distributed schedule must be greater than 0 and less than max pool size!");
            }
            if (corePoolSize != this.dbScheduleServerConfig.getCorePoolSize()) {
                this.dbScheduleServerConfig.setCorePoolSize(corePoolSize);
                DistributedTaskExecutorManager.getSingleInstance().setCorePoolSize(corePoolSize);
            }
        } finally {
            serverConfigLock.unlock();
        }
    }

    /**
     * 设置分布式线程池最大线程数量
     */
    public void setMaxPoolSize(int maxPoolSize) {
        final ReentrantLock serverConfigLock = this.serverConfigLock;
        serverConfigLock.lock();
        try {
            if (maxPoolSize <= 0 || maxPoolSize < this.dbScheduleServerConfig.getCorePoolSize()) {
                throw new DistributedScheduleRuntimeException("The max pool size of distributed schedule must be greater than or equal to 0 and greater than core pool size!");
            }
            if (maxPoolSize != this.dbScheduleServerConfig.getMaxPoolSize()) {
                this.dbScheduleServerConfig.setMaxPoolSize(maxPoolSize);
                DistributedTaskExecutorManager.getSingleInstance().setMaxPoolSize(maxPoolSize);
            }
        } finally {
            serverConfigLock.unlock();
        }
    }

    /**
     * 同时修改分布式线程池核心线程数量与最大线程数量
     */
    public void setPoolSize(int corePoolSize, int maxPoolSize) {
        final ReentrantLock serverConfigLock = this.serverConfigLock;
        serverConfigLock.lock();
        try {
            if (corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize) {
                throw new DistributedScheduleRuntimeException("The core pool size of distributed schedule must be greater than 0 and less than max pool size!");
            }
            if (corePoolSize != this.dbScheduleServerConfig.getCorePoolSize() || maxPoolSize != this.dbScheduleServerConfig.getMaxPoolSize()) {
                this.dbScheduleServerConfig.setCorePoolSize(corePoolSize);
                this.dbScheduleServerConfig.setMaxPoolSize(maxPoolSize);
                DistributedTaskExecutorManager.getSingleInstance().setPoolSize(corePoolSize, maxPoolSize);
            }
        } finally {
            serverConfigLock.unlock();
        }
    }

    /**
     * 设置非核心线程最大空闲时间
     */
    public void setKeepAliveTime(long keepAliveTimeMilli) {
        final ReentrantLock serverConfigLock = this.serverConfigLock;
        serverConfigLock.lock();
        try {
            if (keepAliveTimeMilli < 0) {
                throw new DistributedScheduleRuntimeException("The keep alive time of distributed schedule must be greater than 0!");
            }
            if (keepAliveTimeMilli != this.dbScheduleServerConfig.getKeepAliveTimeMilli()) {
                this.dbScheduleServerConfig.setKeepAliveTimeMilli(keepAliveTimeMilli);
                DistributedTaskExecutorManager.getSingleInstance().setKeepAliveTime(keepAliveTimeMilli, TimeUnit.MILLISECONDS);
            }
        } finally {
            serverConfigLock.unlock();
        }
    }

    public void setDbScheduleConfig(DbScheduleConfig dbScheduleConfig) {
        this.dbScheduleConfig = dbScheduleConfig;
    }

    public void setLocalDbScheduleServerConfig(DbScheduleServer localDbScheduleServerConfig) {
        if (this.localDbScheduleServerConfig == null) {
            this.localDbScheduleServerConfig = localDbScheduleServerConfig;
        }
    }

    public DbScheduleConfig getDbScheduleConfig() {
        return this.dbScheduleConfig;
    }

    public DbScheduleServer getDbScheduleServerConfig() {
        return this.dbScheduleServerConfig;
    }

    private static DistributedScheduleManager distributedScheduleManager;

    public static DistributedScheduleManager getSingleInstance() {
        if (distributedScheduleManager == null) {
            synchronized (DistributedScheduleManager.class) {
                if (distributedScheduleManager == null) {
                    distributedScheduleManager = new DistributedScheduleManager();
                }
            }
        }
        return distributedScheduleManager;
    }

    private DistributedScheduleManager() {
        this.dbScheduleConfig = DistributedScheduleService.getInstance().getDbScheduleConfig();
    }

}
